Skip to main content

Stream App

What is a Stream App?

A stream app is an app that runs continuously on the server for the duration of the active data stream. Stream apps can be time or depth based.

The app receives real time data in the form of events that includes records in the events. It receives those events whenever the data provider provides that data to Corva.

They are typically continuous one second or one foot data, but can have latency.

The stream app receives corva#wits data for drilling time based, corva#drilling.wits.depth data for drilling depth based, corva#completion.wits data for completions, and corva#wireline.wits data for wireline.

The stream app can declare records from the stream event as well as individual indexes and key values like asset_id, company_id, stage number (completions), measured depth (drilling depth), log identifier (drilling depth) or other indexes and key values included in the event record.


When to build a Stream App?

Stream apps are most frequently used for real-time operation applications. Typical use cases for a stream app are when your business logic requires real-time data at regular intervals equal to 1 second or fractions of a foot. Most usages are for visualizations that require a detailed, granular look at the incoming data or that might impact real-time operational decision making.

  • I require real-time data

  • I require regularly sampled data with interval = 1s or fractions of 1ft

  • I require the app to be provisioned to the asset stream

  • I do not require the app to be invoked from a front-end app

  • I do not require the app to be triggered

If you do not require real-time data at one second or 1 foot intervals (or the next available measurement), you should use a Scheduled App.


Where to find the data to build a Stream App?

The Corva Dev Center Dataset Explorer is an easy way to to see a list of Corva datasets and the data stored in each dataset. An additional option is to utilize Corva's API tools located in the Dev Center Intro page and make a GET API request to your desired dataset.

Stream apps only run on active streams, for example wits, drilling.wits.depth, completion.wits or wireline.wits. To determine what data is available, you will need to review the current wits data available for either drilling time based: corva#wits, drilling depth based: corva#drilling.wits.depth, completions: corva#completion.wits or wireline: corva#wireline.wits. Data channels that Corva receives may vary from well to well dependent on the data provider. See examples each type of stream data below.

corva#wits (Drilling Time Based)
[
{
"_id": "6144da0ef249cd412db8f4f4",
"version": 1,
"provider": "corva",
"collection": "wits",
"timestamp": 1436941080,
"asset_id": 123456,
"company_id": 1,
"app": "corva.app",
"data_raw": {},
"metadata": {
"drillstring": "6144b8919c07fd21d8cb184b",
"casing": null,
"mud": "6144b9f836cfd72a42a97542",
"cuttings": null,
"surface-equipment": "6144b9e46764c92dff879d33",
"actual_survey": "6144ba975140c7244f842905",
"plan_survey": null
},
"data": {
"entry_at": 1436941080,
"mud g/l alarm state": 0,
"total mud volume": 0,
"trip tank mud volume": 0,
"line wear": 9313,
"totalpumpdisplacement": 0,
"on bottom hours": 24.1,
"circulating hours": 35.5,
"tool face": 1234,
"inclination": 2.9,
"azimuth": 154.06,
"h2s": 1,
"nitrogen pressure in": 1,
"nitrogen volume in": 0.9,
"hydrocarbon flow": 0,
"mwd temperature": 0,
"mwd vibration count": 0,
"mwd general variable 1": 0,
"condensate out": 0.84,
"mwd general variable 3": 0,
"mwd general variable 4": 0,
"mwd vibration xy gamma": 0,
"mwd vibration z gamma": 0,
"over pull": 0,
"fill strokes": 0,
"total fill strokes": 0,
"mwd general variable 7": 0,
"mwd general variable 8": 0,
"mwd general variable 9": 0,
"mwd general variable 10": 0,
"min wob": 0,
"surface stick slip index": 0,
"mwd dynamic inc": 0,
"mwd dynamic azi": 0,
"mwd vibration xyz gamma": 0,
"cement fluid temp": 0,
"mwd vibration xyz": 0,
"autodriller status 2 uw": 0,
"nitrogen volume out": 0.5,
"autodriller status 2 lw": 0,
"trip tank 1 low threshold": 0,
"trip tank 1 high threshold": 0,
"total gas return": 1,
"trip tank 2 low threshold": 0,
"trip tank 2 high threshold": 0,
"sensor depth": 0,
"bit rpm": 0,
"pvt total mud gain/loss": 0,
"mwd general variable 2": 0,
"wits lag depth": 0,
"pason lag depth": 6923.4,
"autodriller drum ticks": 0,
"flow data air (cmf)": 0,
"flow data pressure (psig)": 0,
"flow data temp (f)": 89,
"tts weight on bit": 0,
"drilling activity": 0,
"min tts weight on bit": 0,
"trip speed": 0,
"pump 4 total strokes": 0,
"autodriller status": 0,
"autodriller sensitivity": 0,
"convertible torque": 0,
"min convertible torque": 0,
"mwd general variable 5": 0,
"mwd general variable 6": 0,
"porosity 2 depth": 432,
"formation density depth": 0.51,
"min pressure": 0,
"min hook load": 0,
"min torque": 0,
"min rpm": 0,
"mwd general variable 0": 0,
"relative mse": 0,
"motor rpm": 0,
"porosity 1 depth": 0.87,
"autodriller wob": 0,
"autodriller brake pos": 0,
"autodriller diff press": 0,
"autodriller status uw": 0,
"autodriller status lw": 0,
"autodriller off bottom rop limit": 0,
"autodriller ticks per depth": 0,
"pvt mud tanks included": 0,
"pvt monitor mud gain/loss": 0,
"pvt mud g/l threshold": 0,
"trip tank 1 refill status": 0,
"flow 1 g/l threshold": 0,
"xy accel_ severity level": 0,
"z accel_ severity level": 0,
"trip tank fill": 0,
"trip tank accum": 0,
"rate of penetration": 0,
"time of penetration": 0,
"memos": "WELL CAPPED",
"bit_depth": 6921.1,
"block_height": 21.6,
"hole_depth": 6923.5,
"diff_press": 0,
"hook_load": 0,
"rop": 0,
"rotary_rpm": 0,
"pump_spm_1": 0,
"pump_spm_2": 0,
"standpipe_pressure": 0,
"rotary_torque": 0,
"pump_spm_total": 0,
"mud_flow_in": 0,
"strks_total": 0,
"strks_pump_3": 0,
"weight_on_bit": 0,
"strks_pump_1": 0,
"strks_pump_2": 0,
"ad_rop_setpoint": 0,
"ad_wob_setpoint": 0,
"gravity_tool_face": 26,
"magnetic_tool_face": 234,
"ad_diff_press_setpoint": 0,
"gamma_ray": 0,
"true_vertical_depth": 6890.78,
"state": "In Slips"
}
}
]
corva#drilling.wits.depth (Drilling Depth Based)
[
{
"_id":"638e08ad591fc9530bc95021",
"version":1,
"provider":"corva",
"collection":"drilling.wits.depth",
"asset_id":123456,
"data":{
"dep":5.23,
"ctda":0,
"ctdi":0.00000572499837536711,
"hdtv":5.22999999999999,
"remarks":null,
"tvd":5.2
},
"measured_depth":5.2,
"timestamp_read":1670252838,
"company_id":1,
"app":"corva.witsml-depth-source",
"log_identifier":"222f83a8ad8b"
},
{
"_id":"638e1914591fc9530bc97ec0",
"version":1,
"provider":"corva",
"collection":"drilling.wits.depth",
"measured_depth":4780,
"timestamp_read":1670256913,
"asset_id":123456,
"company_id":1,
"app":"corva.witsml-depth-source",
"log_identifier":"16a57de71bb2",
"data":{
"dep": 4780,
"hob": 2610.4,
"bitf": 2125,
"ctda": 294.647081162492,
"ctdi": 9.49662567605937,
"hdtv": 4740.39582281383,
"ropa": 29.4941738247871,
"dmiavg": 12.3505077891987,
"dmoavg": 0,
"fliavg": 1300.42742359428,
"floavg": 910.745726579475,
"ghcavg": 0.083685715571959,
"ghcmax": 0.108048261328046,
"hkldav": 128.081843750071,
"lagdep": 4764.64990234375,
"sppavg": 3456.2018385,
"tmiavg": 117.404556274414,
"tmoavg": 117.404556274414,
"tqabav": 6.77594677734375,
"wobavg": 10.4961884765684,
"rpmsavg": 168.743540454799,
"remarks": null,
"gcc3av": 3.77634501457214,
"gcc5nea": 0,
"gcc4na": 0,
"gcc5na": 0,
"gcc4ia": 0,
"gcc2av": 11.1370010375977,
"gcc1av": 1043.97717285156,
"gcc5ia": 0,
"tvd": 4740.45
}
}
]
corva#completion.wits (Completions)
[
{
"_id": "614bb07772ac9e2516ed92d2",
"version": 1,
"provider": "corva",
"collection": "completion.wits",
"timestamp": 1447635400,
"asset_id": 123456,
"company_id": 1,
"stage_number": 28,
"data": {
"timestamp": 1447635400,
"is_valid": true,
"wellhead_pressure": 0,
"slurry_flow_rate_in": 0,
"elapsed_time": 8260,
"clean_flow_rate_in": 0,
"total_clean_volume_in": 7467.806170953469,
"total_slurry_volume_in": 7873.681666666418,
"total_chemical_rate_in": 0,
"total_friction_reducer": 0,
"total_proppant_concentration": 0,
"total_proppant_mass": 376994.62118233106,
"bottomhole_proppant_concentration": 0,
"hydrostatic_pressure": 3243.6517636200106,
"inverse_hydrostatic_pressure": 0.0003082945004194811,
"backside_pressure": 4,
"enzyme_breaker": 0,
"scale_inhibitor": 0,
"surfactant": 0,
"friction_reducer": 0,
"friction_reducer_extra": 0,
"cross_linker": 0,
"acid": 0,
"gel": 0,
"ph_adjusting_agent": 0,
"accelerator": 0,
"fluid_loss": 0,
"ploymer_plug": 0,
"acid_inhibitor": 0,
"acid_retarder": 0,
"emulsifier": 0,
"clay_stabilizer": 0,
"non_emulsifier": 0,
"fines_suspender": 0,
"anti_sludge": 0,
"iron_control": 0,
"oxygen_scavenger": 0,
"mutual_solvent": 0,
"corrosion_inhibitor": 0,
"paraffin_control": 0,
"biocide": 0,
"instant_crosslinker": 0,
"delayed_crosslinker": 0,
"liquid_breaker": 0,
"powder_breaker": 0,
"divertor": 0,
"powder_gel": 0,
"powder_friction_reducer": 0,
"powder_enzyme_breaker": 0,
"proppant_1_concentration": 0,
"proppant_2_concentration": 0,
"tr press deriv": 0.54,
"slur rate2": 0,
"prop con2": 0,
"bh prop con": 0,
"tmt 100 mesh sand": 137414,
"tmt 40/70 white": 237635,
"j623 conc": 0,
"j624 conc": 1.6,
"pod cr dry conc raw": 374.1,
"an press 1h": 4,
"end_of_stage": 1,
"proppant_1_mass": 375049,
"pumpside_pressure": -14,
"extra_clean_fluid": 0
}
}
]
corva#wireline.wits (Wireline)
[
{
"_id": "5f161be7de7b936052e8bd5e",
"timestamp": 1574279322,
"version": 1,
"provider": "corva",
"collection": "wireline.wits",
"asset_id": 123456,
"company_id": 1,
"stage_number": 2,
"app": "corva.app",
"data": {
"c_006": -0.5522,
"c_007": -999.25,
"current": 0,
"voltage": 0,
"line_speed": 0.05,
"elapsed_time": 1119.9359,
"line_tension": 320.5558,
"measured_depth": 274.7967,
"casing_collar_locator": -0.08,
"state": "Unclassified",
"wellbore_orientation": "Vertical"
}
}
]

How to build a Stream App?

The following examples will demonstrate the simple building blocks of a stream application while utilizing all imported functionalities.


1. Stream Time Event

The use case for the example below is the app needs to retrieve 1 second data for hookload and weight on bit from corva#wits StreamTimeEvent. The app needs to add the hookload and weight on bit values, then POST the calculated values to a custom dataset named example-stream-time-app.


1.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.


1.2. Open and review the README.md file.

# Dev Center Node.js Real-Time Stream Data App

## Getting started

### 1. Install Dependencies


npm install


### 2. Run Tests


npm test


### 3. Deploy

Run `npm run bundle` to create a zip package that can be uploaded to Dev Center

Example of a README.md file


1.3 Make adjustments to the manifest.json file

{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "stream",
"key": "sample.nodejs_stream_app",
"visibility": "private",
"name": "Nodejs Stream App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "index.js",
"function": "handler"
},
"timeout": 120,
"memory": 128,
"environment": {},
"runtime": "nodejs20.x",
"app": {
"log_type": "time"
}
},
"datasets": {}
}

Example of a manifest.json file for a StreamTimeEvent app


1.3.1 Optitional: Set read and/or write permissions to datasets in the manifest.json file.

Read and write permissions can only be granted to company datasets (non-Corva), e.g. sampel#sample-nodejs-stream-app-dataset. Read only permissions can be granted to Corva datasets, e.g. corva#wits. The example below shows how to grant read and write permissions to sample#sample-nodejs-stream-app-dataset.

"datasets": {
"sample.sample-nodejs-stream-app-dataset": {
"permissions": [
"read",
"write"
]
}
}

Example of dataset read/write permissions in the manifest.json file for a StreamTimeEvent app


1.3.2 Optional: Increase the application timeout time in the manifest.json file

If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.

  "settings": {
"entrypoint": {
"file": "index.js",
"function": "handler"
},
"timeout": 240,
"memory": 128,
"environment": {},
"runtime": "nodejs20.x",
"app": {
"log_type": "time"
}
},

Example of app timeout setting in the manifest.json file for a StreamTimeEvent app

Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.


1.3.3 Optional: Increase the application memory in the manifest.json file

If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.

  "settings": {
"entrypoint": {
"file": "index.js",
"function": "handler"
},
"timeout": 240,
"memory": 640,
"environment": {},
"runtime": "nodejs20.x",
"app": {
"log_type": "time"
}
},

Example of app memory setting in the manifest.json file for a StreamTimeEvent app

Warning: Setting the application memory too high will result in high costs. AWS charges for the set memory in an application. AWS does not charge for actual memory used.

Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.


1.3.4 Optional: Change the Log Level in the manifest.json file

As apps are executed very frequently (once a second or so), unlimited logging can lead to huge amounts of data. corva-sdk provides a Logger object, which is a safe way for app logging.

The Logger is a pino instance and should be used like every other logger.

The Logger has following features:

  • Log messages are injected with contextual information, which makes it easy to filter through logs while debugging issues.
  • Log messages have limited length. Too long messages are truncated to not exceed the limit. Max message size can be controlled by LOG_THRESHOLD_MESSAGE_SIZE env variable. Default value is 1000 symbols or bytes.
  • Number of log messages is limited. After reaching the limit logging gets disabled. The number of log messages can be controlled by LOG_THRESHOLD_MESSAGE_COUNT env variable. Default value is 15 messages.
  • Logging level can be set using LOG_LEVEL env variable. Default value is info, other options are trace, debug, error, fatal.
  "settings": {
"entrypoint": {
"file": "index.js",
"function": "handler"
},
"timeout": 240,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "nodejs20.x",
"app": {
"log_type": "time"
}
},

Example of log level environment variable in the manifest.json file for a StreamTimeEvent app


1.4 Implement logic in the index.js file

Now that the app is configured, you can now implement the logic in the index.js file.

Note: Implementing the logic in the index.js file is the most basic way to implement the logic. The user has the option to create directories.

const { Corva } = require('@corva/node-sdk');

/**
* @param {import('@corva/node-sdk').StreamTimeEvent} event - The event object containing the streaming data.
* @param {import('@corva/node-sdk').StatefulContext} context - The context object providing API, logging, and cache functionality.
*/

// Main processor function to handle incoming events and process data
const processor = async (event, context) => {
// Extract the array of records from the event
const records = event.records;

// Define the provider and collection names for identifying data in the API
const providerName = 'sample';
const collectionName = 'sample-nodejs-stream-app-dataset';

// Log the received records for debugging purposes
context.logger.info(`Received records: ${JSON.stringify(records)}`);

// Extract the start and end timestamps from the records array
const startTimestamp = event.records[0].timestamp;
const endTimestamp = event.records[event.records.length - 1].timestamp;

// Log the timestamps for tracking the range of processed data
context.logger.info(`Start timestamp: ${JSON.stringify(startTimestamp)}`);
context.logger.info(`End timestamp: ${JSON.stringify(endTimestamp)}`);

// Extract company and asset identifiers from the event metadata
const companyId = event.company_id;
const assetId = event.asset_id;

// Log the company and asset IDs for reference
context.logger.info(`Company ID: ${JSON.stringify(companyId)}`);
context.logger.info(`Asset ID: ${JSON.stringify(assetId)}`);

// Retrieve the last exported timestamp from the cache to avoid processing duplicate data
const lastExportedTimestamp = parseInt(await context.cache.get('last_exported_timestamp')) || 0;

// Log the last exported timestamp for debugging
context.logger.info(`Last exported timestamp: ${JSON.stringify(lastExportedTimestamp)}`);

// Initialize an array to hold the processed output data
const outputs = [];

// Iterate through each record in the event
for (const record of records) {
// Skip processing records that have already been exported
if (record.timestamp <= lastExportedTimestamp) {
continue;
}

// Extract hook load and weight on bit values from the record data
const hookLoad = record.data.hook_load || 0;
const weightOnBit = record.data.weight_on_bit || 0;

// Calculate the sum of hook load and weight on bit
const wobPlusHkld = hookLoad + weightOnBit;

// Construct the output object for the processed record
const output = {
timestamp: record.timestamp,
company_id: companyId,
asset_id: assetId,
provider: providerName,
collection: collectionName,
version: 1,
data: {
hook_load: hookLoad,
weight_on_bit: weightOnBit,
wob_plus_hkld: wobPlusHkld,
},
};

// Add the processed output to the outputs array
outputs.push(output);
}

// Check if there are outputs to send
if (outputs.length > 0) {
// Log the outputs for debugging before sending
context.logger.debug(`Processed outputs: ${JSON.stringify(outputs)}`);

// Send the processed outputs to the API endpoint
await context.api
.request(`api/v1/data/${providerName}/${collectionName}/`, {
method: 'POST',
json: outputs,
})
.then(() => context.logger.info('POST request successful'))
.catch((error) => context.logger.error(`POST request failed: ${error}`));

// Update the cache with the latest exported timestamp to track progress
await context.cache.set('last_exported_timestamp', outputs[outputs.length - 1].timestamp.toString());
}

// Return the outputs for further use or debugging
return outputs;
};

// Export the processor function for external use
exports.processor = processor;

// Export the stream handler, wrapping the processor using Corva SDK
exports.handler = new Corva().stream(processor);

1.5 Locally test your application

1.5.1 Open the processor.spec.js file** in your project directory.
- This file will contain the code that provides an event to the handler.
- Comment out the existing template, then add the testing code below.
Here’s an example of what the process.spec.js file should look like:
// Comment out the existing template

// const { StreamTimeEvent, StreamTimeRecord } = require('@corva/node-sdk');
// const { app_runner } = require('@corva/node-sdk/testing');

// const { processor } = require('..');

// test('processes event', async () => {
// const event = new StreamTimeEvent({
// company_id: 1,
// asset_id: 1234,
// records: [new StreamTimeRecord({ data: { bit_depth: 4980, hole_depth: 5000 }, timestamp: 1620905165 })],
// });

// expect(await app_runner(processor, event)).toBeUndefined();
// });


// Import required dependencies from the Corva SDK
require('@corva/node-sdk');

// Import the processor function from the main application file
const { processor } = require('../index'); // Adjust the path to your processor file

// Describe the test suite for the processor function
describe('processor', () => {
// Define a test case for processing an event
it('processes event', async () => {
// Create a mock event object that simulates the data sent to the processor
const event = {
records: [
{
timestamp: 1625292000, // Example timestamp for the record
data: {
hook_load: 5000, // Example hook load value
weight_on_bit: 10000, // Example weight on bit value
},
},
],
company_id: 1, // Example company ID
asset_id: 2, // Example asset ID
};

// Create a mock context object to simulate the runtime environment
const context = {
logger: {
info: jest.fn(), // Mock logger.info method
debug: jest.fn(), // Mock logger.debug method
error: jest.fn(), // Mock logger.error method
},
api: {
// Mock API request method to simulate a successful API call
request: jest.fn().mockResolvedValue({ status: 200 }),
},
cache: {
// Mock cache methods to simulate getting and setting cache values
get: jest.fn().mockResolvedValue(null), // Simulate no cached value
set: jest.fn().mockResolvedValue(null), // Simulate successful cache update
},
};

// Call the processor function with the mock event and context
const result = await processor(event, context);

// Verify that the API request was made with the correct arguments
expect(context.api.request).toHaveBeenCalledWith('api/v1/data/sample/sample-nodejs-stream-app-dataset/', {
method: 'POST', //

1.5.2 Execute the test.
- In the terminal command line run `npm test`.
1.5.3 Interpret the output.
- The output of your test will inform if there were errors or if the `jest` test passed. See example of successful output results.
(base) ip-192-168-1-69:sample_nodejs_stream_app corvanaut$ npm test

> sample_sample_nodejs_stream_app@0.0.4 test
> yarn audit --groups dependencies && yarn unit

yarn audit v1.22.22
0 vulnerabilities found - Packages audited: 113
✨ Done in 0.66s.
yarn run v1.22.22
$ jest
PASS __tests__/processor.spec.js
processor
✓ processes event (3 ms)

Test Suites: 1 passed, 1 total
Tests: 1 passed, 1 total
Snapshots: 0 total
Time: 0.565 s, estimated 1 s
Ran all test suites.
✨ Done in 1.33s.

1.6 Deploy your application

Please see Getting Started section 4. Upload and Publish.


1.6.1 App Runner production testing

Please see App Runner section for more information.


1.7 Provision the application

For Company provisioning please see App Provisioning for more information.

For Corva Partners please see App Provisioning with a focus on App Purchases.